package io.openmessaging.demo;

import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.PullConsumer;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class DefaultPullConsumer implements PullConsumer {
    private KeyValue properties;
    private String queue;
    private List<String> buckets = new ArrayList<>();
    private String storeDirPath;


    private static AtomicBoolean flag = new AtomicBoolean(false);

    public static long wroteSize = 0;

    public DefaultPullConsumer(KeyValue properties) {
        this.properties = properties;
        storeDirPath = this.properties.getString("STORE_PATH");
        if (storeDirPath == null) {
            storeDirPath = this.properties.getString("store.path");
        }

        if (flag.compareAndSet(false, true)) {
            wroteSize = getWroteSize(storeDirPath);
            MiscUtils.printKVS("初始化customer时的kvs", properties);
        }

    }

    private long getWroteSize(String path) {
        File file = new File(path);
        long writeSize = 0;
        for (File subFile : file.listFiles(new FilenameFilter() {
            @Override
            public boolean accept(File dir, String name) {
                if (dir != file) {
                    return false;
                }
                if (name.startsWith("msg.")) {
                    return true;
                }
                return false;
            }
        })) {
            try {
                RandomAccessFile raf = new RandomAccessFile(subFile, "r");
                writeSize += raf.readInt();
                raf.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("总共写入了 " + writeSize + " byte数据");
        return writeSize;
    }


    @Override
    public KeyValue properties() {
        return properties;
    }


    @Override
    public synchronized Message poll() {
        if (buckets.size() == 0 || queue == null) {
            return null;
        }
        try {
            return StoreUtils.readMessage(queue, storeDirPath, buckets);
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public Message poll(KeyValue properties) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public void ack(String messageId) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public void ack(String messageId, KeyValue properties) {
        throw new UnsupportedOperationException("Unsupported");
    }

    @Override
    public synchronized void attachQueue(String queueName, Collection<String> topics) {
        if (queue != null && !queue.equals(queueName)) {
            throw new ClientOMSException("You have alreadly attached to a queue " + queue);
        }
        queue = queueName;
        buckets.add("queue-" + queueName);
        for (String topic : topics) {
            buckets.add("topic-" + topic);
        }
    }


}
